package com.cps.platform.service.hadoop.spi;

/**---------------------------------------------------------------- 
 // 文件名：DataToHbase.java 
 // 
 // 创建时间：2012-11-7 
  * 
  * 
  * 
  *  create file /home/cps/aaa.txt   , such as .
  *  
  *  
  *  ok1
  *  ok2
  *  ok3
  *  ok4
  * 
  * 
  * 
  * 
 //----------------------------------------------------------------*/

import java.io.IOException;
import java.util.Random;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

/**
 * 类描述
 * 
 * @version
 * @author
 * 
 *         修改标识： 修改描述：
 * 
 */
public class DataToHbase {

	public static String tablename = "result";

	public static class MapHbase extends
			Mapper<Object, Text, Text, IntWritable> {
		private final static IntWritable one = new IntWritable(1);
		private Text text = new Text();

		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			StringTokenizer stiz = new StringTokenizer(line);
			while (stiz.hasMoreTokens()) {
				text.set(stiz.nextToken());
				context.write(text, one);
			}
		}

	}

	public static class ReduceHbase extends
			TableReducer<Text, IntWritable, Text> {

		public void reduce(Text text, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			Put put = null;
			String rowkey = "";
			int sum = 0;
			for (IntWritable i : values) {
				sum += i.get();
			}
			rowkey = String.valueOf(new Random().nextInt(100));
			put = new Put(rowkey.getBytes());
			put.add("infofamliy".getBytes(), "sum".getBytes(),
					(sum + "").getBytes());
			put.add("infofamliy".getBytes(), "text".getBytes(), text.toString()
					.getBytes());

			context.write(new Text(tablename), put);
		}

	}

	public static void main(String[] args) throws IOException,
			InterruptedException, ClassNotFoundException {
		Configuration conf = new Configuration();
		conf.set("hbase.zookeeper.quorum", "lnxcps01");
		conf.set("hbase.zookeeper.property.clientPort", "2181");
		// conf.set("tmpjars", "e:/aaa.jar");
		// System.setProperty("path.separator", ":");
		Job job;
		job = new Job(conf, "Runner"); //

		Path in = new Path("/home/cps/aaa.txt");
		job.setJarByClass(DataToHbase.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		job.setMapperClass(MapHbase.class);
		job.setReducerClass(ReduceHbase.class);
		FileInputFormat.setInputPaths(job, in);
		// org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job,
		// new Path("D:/testFile/mr1/" + new Date().getTime())) ;

		// create table
		HBaseAdmin admin = null;
		admin = new HBaseAdmin(conf);
		if (admin.tableExists(tablename)) {

		} else {
			HTableDescriptor tableDesc = new HTableDescriptor(tablename);
			tableDesc.addFamily(new HColumnDescriptor("infofamliy"));
			admin.createTable(tableDesc);

		}

		TableMapReduceUtil.initTableReducerJob(tablename, ReduceHbase.class,
				job);
		// com.hbase.TableMapReduceUtil.initMulitTableReducerJob("xchen1",ReduceHbase.class,
		// job, null, null, null,
		// null, true);

		job.waitForCompletion(true);
	}

}